ELB のアクセスログを Parquet 形式に変換して、Amazon Athena からクエリする #reinvent
こんにちは、藤本です。
以前、Amazon AthenaでELBログをSQLで解析する #reinventで ELB のアクセスログを Amazon Athena からクエリする方法をにしざわがご紹介しました。
Amazon Athena はクエリ先のデータサイズや、データの形式、パーティションの構成などでパフォーマンス、コスト効率が変わります。
そこで Amazon Athena はパフォーマンス、コスト効率が良いカラムナ指向フォーマットの Parquet 形式をサポートしています。Parquet に関してはAmazon Athena: カラムナフォーマット『Parquet』でクエリを試してみた #reinventをご参照ください。
パフォーマンスに関してはクエリ方法によって変わりますが、料金に関わるスキャンサイズに関しては多くのケースで大きく削減が確認されています。
AWS が ELB のアクセスログで試したケースでは下記のような差異を確認しています。
データセット | Amazon S3上のサイズ | クエリ実行時間 | スキャンされたデータ | コスト |
---|---|---|---|---|
テキストファイルで保存されたデータ | 1 TB | 236 秒 | 1.15 TB | $5.75 |
Apache Parquet形式で保存されたデータ | 130 GB | 6.78 秒 | 2.51 GB | $0.013 |
削減 / スピードアップ | Parquetで87%削減 | 34倍高速 | 99%減のデータしかスキャンされない | 99.7%削減 |
そこで今回は下記ブログで紹介されている ELB のアクセスログを Parquet 形式に変換する手順を試してみました。
試してみた
今回の流れは以下のようになります。
- EMR クラスタ立ち上げ
- スクリプトダウンロード
- データ変換
- 変換したデータを S3 へ転送
- Amazon Athena からクエリ
EMR クラスタ立ち上げ
データ変換するスクリプトを実行する EMR クラスタを立ち上げます。
マネジメントコンソールから EMR の画面を遷移します。
Create cluster から立ち上げる EMR クラスタを設定します。
Go to advanced options から詳細設定します。
今回は Spark を利用するので Spark をチェックします。
インスタンスは Core node に r3.8xlarge を 4台利用しました。コスト節約でスポットインスタンスにしました。
オプションはそのまま
Master Node でコマンド実行するので、key pair を設定します。
10分ぐらいで起動しました。
スクリプトダウンロード
マスターノードへデータ変換するスクリプトをダウンロードします。
EMR クラスタが立ち上がったら、マスターノードへ SSH 接続します。
$ ssh -i <<private key>> hadoop@<<master node ip address>>
スクリプトは Github に公開されていますので、git コマンドでダウンロードします。
$ sudo yum install git -y Loaded plugins: priorities, update-motd, upgrade-helper : Installed: git.x86_64 0:2.7.4-1.47.amzn1 Dependency Installed: perl-Error.noarch 1:0.17020-2.9.amzn1 perl-Git.noarch 0:2.7.4-1.47.amzn1 perl-TermReadKey.x86_64 0:2.30-20.9.amzn1 Complete! $ git clone https://github.com/awslabs/aws-big-data-blog.git Cloning into 'aws-big-data-blog'... remote: Counting objects: 3190, done. remote: Compressing objects: 100% (10/10), done. remote: Total 3190 (delta 2), reused 0 (delta 0), pack-reused 3180 Receiving objects: 100% (3190/3190), 30.83 MiB | 21.08 MiB/s, done. Resolving deltas: 100% (826/826), done. Checking connectivity... done. $ cd aws-big-data-blog/aws-blog-spark-parquet-conversion/ $ tree . . ├── addpartitions.gvy ├── addpartitions.hql ├── convert2parquet.py ├── createtable.hql └── README.md 0 directories, 5 files
- createtable.hql : Hive のテーブル作成
- addpartitions.hql : テーブルにパーティション設定
- convert2parquet.py : データを取得し、Parquet 形式で出力する Spark スクリプト
データ変換
ダウンロードしたスクリプトを利用して ELB のアクセスログを Parquet 形式へデータ変換し、HDFS へ保存します。
今回はサンプルで用意されている ELB のアクセスログを利用します。
まずは HiveQL のスクリプトcreatetable.hql
、addpartitions.hql
を実行します。自身の環境で利用する場合はそれぞれの環境に合わせて hql ファイルを修正してください。弊社の Amazon Athena のブログをいくつか読めば、修正は簡単です。
テーブル作成
$ hive -f createtable.hql Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j2.properties Async: false OK Time taken: 2.355 seconds OK Time taken: 4.562 seconds
パーティション作成
$ hive -f addpartitions.hql Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j2.properties Async: false OK Time taken: 4.217 seconds OK Time taken: 0.3 seconds : OK Time taken: 0.394 seconds OK Time taken: 0.24 seconds
データ変換
ELB のアクセスログを Python 実装の Sparkアプリケーションにより Parquet 形式に変換します。
$ spark-submit --num-executors 85 --executor-memory 5g convert2parquet.py 16/12/27 15:49:34 INFO SparkContext: Running Spark version 2.0.2 16/12/27 15:49:35 INFO SecurityManager: Changing view acls to: hadoop 16/12/27 15:49:35 INFO SecurityManager: Changing modify acls to: hadoop 16/12/27 15:49:35 INFO SecurityManager: Changing view acls groups to: 16/12/27 15:49:35 INFO SecurityManager: Changing modify acls groups to: 16/12/27 15:49:35 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); groups with view permissions: Set(); users with modify permissions: Set(hadoop); groups with modify permissions: Set() 16/12/27 15:49:35 INFO Utils: Successfully started service 'sparkDriver' on port 33278. : 16/12/27 16:37:40 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices (serviceOption=None, services=List(), started=false) 16/12/27 16:37:40 INFO YarnClientSchedulerBackend: Stopped 16/12/27 16:37:40 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 16/12/27 16:37:40 INFO MemoryStore: MemoryStore cleared 16/12/27 16:37:40 INFO BlockManager: BlockManager stopped 16/12/27 16:37:40 INFO BlockManagerMaster: BlockManagerMaster stopped 16/12/27 16:37:40 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 16/12/27 16:37:40 INFO SparkContext: Successfully stopped SparkContext 16/12/27 16:37:40 INFO ShutdownHookManager: Shutdown hook called 16/12/27 16:37:40 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-6807a65e-8e69-42d7-bafe-a272b87394a3 16/12/27 16:37:40 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-6807a65e-8e69-42d7-bafe-a272b87394a3/pyspark-bcdb1647-2213-4f14-b987-30c12c4fee8a
1時間弱で完了しました。
変換したデータを S3 へ転送
Apache DistCp の拡張コマンドs3-dist-cp
を利用して、HDFS に置かれた変換したデータを S3 へ転送します。
$ s3-dist-cp --src="hdfs:///user/hadoop/elblogs_pq" --dest="s3://<<bucket name>>/<<key prefix>>/" 16/12/27 22:58:40 INFO s3distcp.S3DistCp: Running with args: -libjars /usr/share/aws/emr/s3-dist-cp/lib/guava-15.0.jar,/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp-2.4.0.jar,/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar --src=hdfs:///user/hadoop/elblogs_pq --dest=s3://<<bucket name>>/ 16/12/27 22:58:41 INFO s3distcp.S3DistCp: S3DistCp args: --src=hdfs:///user/hadoop/elblogs_pq --dest=s3://<<bucket name>>/ 16/12/27 22:58:41 INFO s3distcp.S3DistCp: Using output path 'hdfs:/tmp/a5dbe6ce-6168-4f7a-a555-5d9473eed13c/output' : 16/12/27 23:00:51 INFO mapreduce.Job: Job job_1482853503455_0004 completed successfully 16/12/27 23:00:51 INFO mapreduce.Job: Counters: 54 File System Counters FILE: Number of bytes read=39055 FILE: Number of bytes written=12609556 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=139824014636 HDFS: Number of bytes written=0 HDFS: Number of read operations=655 HDFS: Number of large read operations=0 HDFS: Number of write operations=190 S3: Number of bytes read=0 S3: Number of bytes written=139823896747 S3: Number of read operations=0 S3: Number of large read operations=0 S3: Number of write operations=0 Job Counters Launched map tasks=1 Launched reduce tasks=95 Rack-local map tasks=1 Total time spent by all maps in occupied slots (ms)=308570 Total time spent by all reduces in occupied slots (ms)=1261182348 Total time spent by all map tasks (ms)=2615 Total time spent by all reduce tasks (ms)=5343993 Total vcore-milliseconds taken by all map tasks=2615 Total vcore-milliseconds taken by all reduce tasks=5343993 Total megabyte-milliseconds taken by all map tasks=9874240 Total megabyte-milliseconds taken by all reduce tasks=40357835136 Map-Reduce Framework Map input records=366 Map output records=366 Map output bytes=152740 Map output materialized bytes=38675 Input split bytes=155 Combine input records=0 Combine output records=0 Reduce input groups=366 Reduce shuffle bytes=38675 Reduce input records=366 Reduce output records=0 Spilled Records=732 Shuffled Maps =95 Failed Shuffles=0 Merged Map outputs=95 GC time elapsed (ms)=40870 CPU time spent (ms)=4191750 Physical memory (bytes) snapshot=111239815168 Virtual memory (bytes) snapshot=825331716096 Total committed heap usage (bytes)=231069974528 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=117734 File Output Format Counters Bytes Written=0 16/12/27 23:00:51 INFO s3distcp.S3DistCp: Try to recursively delete hdfs:/tmp/a5dbe6ce-6168-4f7a-a555-5d9473eed13c/tempspace
3分ほどで完了です。
ちなみにデータサイズを比較したところ、12% ぐらいになっていました。
- 元データサイズ: 1.14 TB
- 変換後データサイズ: 0.14 TB
Amazon Athena からクエリ
S3 へアップロードした Parquet 形式の ELB アクセスログへ Amazon Athena からクエリしてみましょう。
Athena のテーブルを作成します。データベースは default を利用しています。
CREATE EXTERNAL TABLE IF NOT EXISTS elb_logs_pq ( request_timestamp string, elb_name string, request_ip string, request_port int, backend_ip string, backend_port int, request_processing_time double, backend_processing_time double, client_response_time double, elb_response_code string, backend_response_code string, received_bytes bigint, sent_bytes bigint, request_verb string, url string, protocol string, user_agent string, ssl_cipher string, ssl_protocol string ) PARTITIONED BY(year int, month int, day int) STORED AS PARQUET LOCATION 's3://<<bucket name>>/' tblproperties ("parquet.compress"="SNAPPY");
続いて、パーティションを設定します。今回のスクリプトでは year=xxxx/month=xx/day=xx
という形式のパスとなっているので、パーティションを自動生成できます。
msck repair table elb_logs_pq
パーティションが生成されました。
それではクエリしてみましょう。ステータスコード毎のデータ件数を集計します。
圧縮ファイル、38億レコードの集計が 8秒強で返ってきました!驚異的ですね!
まとめ
Amazon Athena は使い方によって、コスト効率、パフォーマンスが大きく変わります。月1,2回クエリ頻度であれば、データ変換するコストの方がかかるかもしれません。QuickSight で色々な可視化をするようであれば、何度もクエリが発生すると思いますので、今回ご紹介したような方法でコスト効率を向上させることができるかと思います。